/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


package org.apache.predictionio.data.storage.jdbc

import grizzled.slf4j.Logging
import org.apache.predictionio.data.storage.DataMap
import org.apache.predictionio.data.storage.Event
import org.apache.predictionio.data.storage.LEvents
import org.apache.predictionio.data.storage.StorageClientConfig
import org.joda.time.DateTime
import org.joda.time.DateTimeZone
import org.json4s.JObject
import org.json4s.native.Serialization.read
import org.json4s.native.Serialization.write
import scalikejdbc._

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

/** JDBC implementation of [[LEvents]] */
class JDBCLEvents(
    client: String,
    config: StorageClientConfig,
    namespace: String) extends LEvents with Logging {
  implicit private val formats = org.json4s.DefaultFormats

  override def init(appId: Int, channelId: Option[Int] = None): Boolean = {

    // To use index, it must be varchar less than 255 characters on a VARCHAR column
    val useIndex = config.properties.contains("INDEX") &&
      config.properties("INDEX").equalsIgnoreCase("enabled")

    val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)
    val entityIdIndexName = s"idx_${tableName}_ei"
    val entityTypeIndexName = s"idx_${tableName}_et"
    DB autoCommit { implicit session =>
      if (useIndex) {
        SQL(s"""
      create table if not exists $tableName (
        id varchar(32) not null primary key,
        event varchar(255) not null,
        entityType varchar(255) not null,
        entityId varchar(255) not null,
        targetEntityType text,
        targetEntityId text,
        properties text,
        eventTime timestamp DEFAULT CURRENT_TIMESTAMP,
        eventTimeZone varchar(50) not null,
        tags text,
        prId text,
        creationTime timestamp DEFAULT CURRENT_TIMESTAMP,
        creationTimeZone varchar(50) not null)""").execute().apply()

        // create index
        SQL(s"create index $entityIdIndexName on $tableName (entityId)").execute().apply()
        SQL(s"create index $entityTypeIndexName on $tableName (entityType)").execute().apply()
      } else {
        SQL(s"""
      create table if not exists $tableName (
        id varchar(32) not null primary key,
        event text not null,
        entityType text not null,
        entityId text not null,
        targetEntityType text,
        targetEntityId text,
        properties text,
        eventTime timestamp DEFAULT CURRENT_TIMESTAMP,
        eventTimeZone varchar(50) not null,
        tags text,
        prId text,
        creationTime timestamp DEFAULT CURRENT_TIMESTAMP,
        creationTimeZone varchar(50) not null)""").execute().apply()
      }
      true
    }
  }

  override def remove(appId: Int, channelId: Option[Int] = None): Boolean =
    DB autoCommit { implicit session =>
      SQL(s"""
      drop table ${JDBCUtils.eventTableName(namespace, appId, channelId)}
      """).execute().apply()
      true
    }

  override def close(): Unit = ConnectionPool.closeAll()

  override def futureInsert(event: Event, appId: Int, channelId: Option[Int])(
    implicit ec: ExecutionContext): Future[String] = Future {
    DB localTx { implicit session =>
      val id = event.eventId.getOrElse(JDBCUtils.generateId)
      val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
      sql"""
      insert into $tableName values(
        $id,
        ${event.event},
        ${event.entityType},
        ${event.entityId},
        ${event.targetEntityType},
        ${event.targetEntityId},
        ${write(event.properties.toJObject)},
        ${event.eventTime},
        ${event.eventTime.getZone.getID},
        ${if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else None},
        ${event.prId},
        ${event.creationTime},
        ${event.creationTime.getZone.getID}
      )
      """.update().apply()
      id
    }
  }

  override def futureInsertBatch(events: Seq[Event], appId: Int, channelId: Option[Int])(
    implicit ec: ExecutionContext): Future[Seq[String]] = Future {
    DB localTx { implicit session =>
      val ids = events.map(_.eventId.getOrElse(JDBCUtils.generateId))
      val params = events.zip(ids).map { case (event, id) =>
        Seq(
          'id               -> id,
          'event            -> event.event,
          'entityType       -> event.entityType,
          'entityId         -> event.entityId,
          'targetEntityType -> event.targetEntityType,
          'targetEntityId   -> event.targetEntityId,
          'properties       -> write(event.properties.toJObject),
          'eventTime        -> event.eventTime,
          'eventTimeZone    -> event.eventTime.getZone.getID,
          'tags             -> (if(event.tags.nonEmpty) Some(event.tags.mkString(",")) else None),
          'prId             -> event.prId,
          'creationTime     -> event.creationTime,
          'creationTimeZone -> event.creationTime.getZone.getID
        )
      }

      val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
      sql"""
      insert into $tableName values(
        {id},
        {event},
        {entityType},
        {entityId},
        {targetEntityType},
        {targetEntityId},
        {properties},
        {eventTime},
        {eventTimeZone},
        {tags},
        {prId},
        {creationTime},
        {creationTimeZone}
      )
      """.batchByName(params: _*).apply()

      ids
    }
  }

  override def futureGet(eventId: String, appId: Int, channelId: Option[Int])(
    implicit ec: ExecutionContext): Future[Option[Event]] = Future {
    DB readOnly { implicit session =>
      val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
      sql"""
      select
        id,
        event,
        entityType,
        entityId,
        targetEntityType,
        targetEntityId,
        properties,
        eventTime,
        eventTimeZone,
        tags,
        prId,
        creationTime,
        creationTimeZone
      from $tableName
      where id = $eventId
      """.map(resultToEvent).single().apply()
    }
  }

  override def futureDelete(eventId: String, appId: Int, channelId: Option[Int])(
    implicit ec: ExecutionContext): Future[Boolean] = Future {
    DB localTx { implicit session =>
      val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
      sql"""
      delete from $tableName where id = $eventId
      """.update().apply()
      true
    }
  }

  override def futureFind(
      appId: Int,
      channelId: Option[Int] = None,
      startTime: Option[DateTime] = None,
      untilTime: Option[DateTime] = None,
      entityType: Option[String] = None,
      entityId: Option[String] = None,
      eventNames: Option[Seq[String]] = None,
      targetEntityType: Option[Option[String]] = None,
      targetEntityId: Option[Option[String]] = None,
      limit: Option[Int] = None,
      reversed: Option[Boolean] = None
    )(implicit ec: ExecutionContext): Future[Iterator[Event]] = Future {
    DB readOnly { implicit session =>
      val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
      val whereClause = sqls.toAndConditionOpt(
        startTime.map(x => sqls"eventTime >= $x"),
        untilTime.map(x => sqls"eventTime < $x"),
        entityType.map(x => sqls"entityType = $x"),
        entityId.map(x => sqls"entityId = $x"),
        eventNames.map(x =>
          sqls.toOrConditionOpt(x.map(y =>
            Some(sqls"event = $y")
          ): _*)
        ).getOrElse(None),
        targetEntityType.map(x => x.map(y => sqls"targetEntityType = $y")
            .getOrElse(sqls"targetEntityType IS NULL")),
        targetEntityId.map(x => x.map(y => sqls"targetEntityId = $y")
            .getOrElse(sqls"targetEntityId IS NULL"))
      ).map(sqls.where(_)).getOrElse(sqls"")
      val orderByClause = reversed.map(x =>
        if (x) sqls"eventTime desc" else sqls"eventTime asc"
      ).getOrElse(sqls"eventTime asc")
      val limitClause = limit.map(x =>
        if (x < 0) sqls"" else sqls.limit(x)
      ).getOrElse(sqls"")
      val q = sql"""
      select
        id,
        event,
        entityType,
        entityId,
        targetEntityType,
        targetEntityId,
        properties,
        eventTime,
        eventTimeZone,
        tags,
        prId,
        creationTime,
        creationTimeZone
      from $tableName
      $whereClause
      order by $orderByClause
      $limitClause
      """
      q.map(resultToEvent).list().apply().toIterator
    }
  }

  private[predictionio] def resultToEvent(rs: WrappedResultSet): Event = {
    Event(
      eventId = rs.stringOpt("id"),
      event = rs.string("event"),
      entityType = rs.string("entityType"),
      entityId = rs.string("entityId"),
      targetEntityType = rs.stringOpt("targetEntityType"),
      targetEntityId = rs.stringOpt("targetEntityId"),
      properties = rs.stringOpt("properties").map(p =>
        DataMap(read[JObject](p))).getOrElse(DataMap()),
      eventTime = new DateTime(rs.jodaDateTime("eventTime"),
        DateTimeZone.forID(rs.string("eventTimeZone"))),
      tags = rs.stringOpt("tags").map(t => t.split(",").toList).getOrElse(Nil),
      prId = rs.stringOpt("prId"),
      creationTime = new DateTime(rs.jodaDateTime("creationTime"),
        DateTimeZone.forID(rs.string("creationTimeZone")))
    )
  }
}
